In [1]:
import ray

Starting Ray

There are two main ways in which Ray can be used. First, you can start all of the relevant Ray processes and shut them all down within the scope of a single script. Second, you can connect to and use an existing Ray cluster.

Starting and stopping a cluster within a script

One use case is to start all of the relevant Ray processes when you call ray.init and shut them down when the script exits. These processes include local and global schedulers, an object manager, a redis server, and more.

Note: this approach is limited to a single machine. This can be done as follows.


In [2]:
ray.init()


Waiting for redis server at 127.0.0.1:45530 to respond...
Waiting for redis server at 127.0.0.1:10864 to respond...
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8899/notebooks/ray_ui63087.ipynb?token=2c837501cb66d580b7d847a2409a27b9202f0b0640fe13bc
======================================================================

Out[2]:
{'local_scheduler_socket_names': ['/tmp/scheduler8744153'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store47693603', manager_name='/tmp/plasma_manager76799956', manager_port=65482)],
 'redis_address': '127.0.0.1:45530',
 'webui_url': 'http://localhost:8899/notebooks/ray_ui63087.ipynb?token=2c837501cb66d580b7d847a2409a27b9202f0b0640fe13bc'}

If there are GPUs available on the machine, you should specify this with the num_gpus argument. Similarly, you can also specify the number of CPUs with num_cpus.


In [3]:
#ray.init(num_cpus=20,num_gpus=2)

By default, Ray will use psutil.cpu_count() to determine the number of CPUs, and by default the number of GPUs will be zero.

Instead of thinking about the number of "worker" processes on each node, we prefer to think in terms of the quantities of CPU and GPU resources on each node and to provide the illusion of an infinite pool of workers. Tasks will be assigned to workers based on the availability of resources so as to avoid contention and not based on the number of available worker processes.

Connecting to an existing cluster

Once a Ray cluster has been started, the only thing you need in order to connect to it is the address of the Redis server in the cluster. In this case, your script will not start up or shut down any processes. The cluster and all of its processes may be shared between multiple scripts and multiple users. To do this, you simply need to know the address of the cluser's Redis server. This can be done with a command like the following.


In [4]:
# ray.init(redis_address="12.345.67.89:6379")

In this case, you cannot specify num_cpus or num_gpus in ray.init because that information is passed into the cluster when the cluster is started, not when your script is started.

ray.init (redis_address=None, node_ip_address=None, object_id_seed=None, num_workers=None, driver_mode=0, redirect_output=False, num_cpus=None, num_gpus=None, num_custom_resource=None, num_redis_shards=None, plasma_directory=None, huge_pages=False)

Parameters

  • redis_address (str) - The address of the Redis server to connect to. If this address is not provided, then this command will start Redis, a global scheduler, a local scheduler, a plasma store, a plasma manager, and some workers. It will also kill these processes when Python exists.

  • object_id_seed (int) - Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs.

  • num_workers (int) - The number of workers to start. This is only provided if redis_address is not provided.

  • driver_mode (bool) - The mode in which to start the driver. This should be one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.

  • redirect_output (bool) - True if stdout and stderr for all the processes should be redirected to files and false otherwise.

  • num_cpus (int) - Number of cpus the user wishes all local schedulers to be configured with.

  • num_gpus (int) - Number of gpus the user wishes all local schedulers to be configured with.

  • num_custom_resource (int) - The quantity of a user-defined custom resource that the local scheduler should be configured with. This flag is experimental and is subject to changes in the future.

  • num_redis_shards - The number of Redis shards to start in addition to the primary Redis shard.

  • plasma_directory - A directory where the Plasma memory mapped files will be created.

  • huge_pages - Boolean flag indicating whether to start the Object Store with hugetlbfs support. Requires plasma_directory.

Returns Address information about the started processes.

Raises Exception - An exception is raised if an inappropriate combination of arguments is passed in.

Defining Remote Functions

Remote functions are used to create tasks. To define a remote function, the @ray.remote decorator is placed over the function definition.

The function can then be invoked with f.remote. Invoking the function creates a task which will be scheduled on and executed by some worker process in the Ray cluster. The call will return an object ID (essentially a future) representing the eventual return value of the task. Anyone with the object ID can retrieve its value, regardless of where the task was executed.

When a task executes, its outputs will be serialized into a string of bytes and stored in the object store.

Note that arguments to remote functions can be values or object IDs.


In [5]:
@ray.remote
def f(x):
    return x+1

x_id = f.remote(0)
ray.get(x_id)  # 1

#y_id = f.remote(x_id)
#ray.get(y_id)  # 2


Out[5]:
1

If you want a remote function to return multiple object IDs, you can do that by passing the num_return_vals argument into the remote decorator.


In [6]:
@ray.remote(num_return_vals=2)
def f():
    return 1,2

x_id, y_id = f.remote()
ray.get(x_id) #1
#ray.get(y_id) #2


Out[6]:
1

ray.remote (*args , **kwargs)

This decorator is used to define remote functions and to define actors.

Parameters

  • num_return_vals (int) - The number of object IDs that a call to this function should return.

  • num_cpus (int) - The number of CPUs needed to execute this function.

  • num_gpus (int) - The number of GPUs needed to execute this function.

  • num_custom_resource (int) - The quantity of a user-defined custom resource that is needed to execute this function. This flag is experimental and is subject to changes in the future.

  • max_calls (int) - The maximum number of tasks of this kind that can be run on a worker before the worker needs to be restarted.

  • checkpoint_interval (int) - The number of tasks to run between checkpoints of the actor state.

Getting Values from Object IDs

Object IDs can be converted into objects by calling ray.get on the object ID. Note that ray.get accepts either a single object ID or a list of object IDs.


In [7]:
@ray.remote
def f():
    return {'key1': ['value']}

# Get one object ID.
ray.get(f.remote())  # {'key1': ['value']}

# Get a list of object IDs.
ray.get([f.remote() for _ in range(2)])  # [{'key1': ['value']}, {'key1': ['value']}]


Out[7]:
[{'key1': ['value']}, {'key1': ['value']}]

Numpy arrays

use numpy arrays whenever possible (efficiency)

Any numpy arrays that are part of the serialized object will not be copied out of the object store. They will remain in the object store and the resulting deserialized object will simply have a pointer to the relevant place in the object store’s memory.

Since objects in the object store are immutable, this means that if you want to mutate a numpy array that was returned by a remote function, you will have to first copy it.

ray.get (object_ids, worker= < ray.worker.Worker object >)

Get a remote object or a list of remote objects from the object store.

This method blocks until the object corresponding to the object ID is available in the local object store. If this object is not in the local object store, it will be shipped from an object store that has it (once the object has been created). If object_ids is a list, then the objects corresponding to each object in the list will be returned.

Parameters

object_ids - Object ID of the object to get or a list of object IDs to get.

Returns A Python object or a list of Python objects.

Putting Objects in the Object Store

The primary way that objects are placed in the object store is by being returned by a task. However, it is also possible to directly place objects in the object store using ray.put.


In [8]:
x_id = ray.put(1)
ray.get(x_id)  # 1


Out[8]:
1

The main reason to use ray.put is that you want to pass the same large object into a number of tasks. By first doing ray.put and then passing the resulting object ID into each of the tasks, the large object is copied into the object store only once, whereas when we directly pass the object in, it is copied multiple times, which is not efficient.


In [9]:
import numpy as np

@ray.remote
def f(x):
    pass

x = np.zeros(10 ** 6)

# Alternative 1: Here, x is copied into the object store 10 times.
[f.remote(x) for _ in range(10)]

# Alternative 2: Here, x is copied into the object store once.
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]


Out[9]:
[ObjectID(7a6e2f40283a3509b1af5cb4102af8b307128c80),
 ObjectID(a3b373167e62e8a90bbed86f198e6c9f74e72675),
 ObjectID(2ec2575e40e4d547df08bc7cd5bf11488887abf3),
 ObjectID(c97709651e3a8af1e67d014f199f582ec1b7d768),
 ObjectID(608d2755fdda2c66738badf1298e8e4800b6e716),
 ObjectID(17e5c7856aa9a8116e7df7b74f821be7a11afe70),
 ObjectID(7261af1696431be9e55c8a27ae0968d3501a0cf6),
 ObjectID(17e7c39466222de89f1317cbcd08b4f5a003fcf5),
 ObjectID(9249f0e3037f102049d32e1eb3751da5e1c14509),
 ObjectID(e65d1627e3935451c925baea662f5e372a588878)]

Note that ray.put is called under the hood in a couple situations.

  • It is called on the values returned by a task.
  • It is called on the arguments to a task, unless the arguments are Python primitives like integers or short strings, lists, tuples, or dictionaries.

ray.put(value, worker= < ray.worker.Worker object >)

Store an object in the object store.

Parameters value – The Python object to be stored.

Returns The object ID assigned to this value.

Waiting for A Subset of Tasks to Finish

It is often desirable to adapt the computation being done based on when different tasks finish. For example, if a bunch of tasks each take a variable length of time, and their results can be processed in any order, then it makes sense to simply process the results in the order that they finish. In other settings, it makes sense to discard straggler tasks whose results may turn out to be negligible to the entire system (dynamic resource allocation).

To do this, we introduce the ray.wait primitive, which takes a list of object IDs and returns when a subset of them are available. By default it blocks until a single object is available, but the num_returns value can be specified to wait for a different number. If a timeout argument is passed in, it will block for at most that many milliseconds and may return a list with fewer than num_returns elements.

The ray.wait function returns two lists. The first list is a list of object IDs of available objects (of length at most num_returns), and the second list is a list of the remaining object IDs, so the combination of these two lists is equal to the list passed in to ray.wait (up to ordering).


In [10]:
import time
import numpy as np

@ray.remote
def f(n):
    time.sleep(n)
    return n

# # Start 3 tasks with different durations.
# results = [f.remote(i) for i in range(3)]
# # Block until 2 of them have finished.
# ready_ids, remaining_ids = ray.wait(results, num_returns=2)

# Start 5 tasks with different durations.
results = [f.remote(i) for i in range(5)]
# Block until 4 of them have finished or 2.5 seconds pass.
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
# Task 4 will be finished after 4 seconds.
ready_ids_4, remaining_ids_4 = ray.wait(results, num_returns=4, timeout=4000)

In [11]:
ready_ids


Out[11]:
[ObjectID(211bfbe194af83f086d927bf571ed27948cf4623),
 ObjectID(73eb10386ed354843091f9484f695995705fb7a0),
 ObjectID(d2b54ccdbba509a50b4e48d3c4291edb98e3a131)]

In [12]:
remaining_ids


Out[12]:
[ObjectID(270c401fbcf695f6be8c4a94fdcce4a827d0c8d0),
 ObjectID(bea711fe8694a14c808608f2d7a5fd45b252e441)]

In [13]:
ready_ids_4


Out[13]:
[ObjectID(211bfbe194af83f086d927bf571ed27948cf4623),
 ObjectID(73eb10386ed354843091f9484f695995705fb7a0),
 ObjectID(d2b54ccdbba509a50b4e48d3c4291edb98e3a131),
 ObjectID(bea711fe8694a14c808608f2d7a5fd45b252e441)]

In [14]:
remaining_ids_4


Out[14]:
[ObjectID(270c401fbcf695f6be8c4a94fdcce4a827d0c8d0)]

It is easy to use this construct to create an infinite loop in which multiple tasks are executing, and whenever one task finishes, a new one is launched.


In [23]:
@ray.remote
def f():
    return 10

# Start 5 tasks.
remaining_ids = [f.remote() for i in range(5)]
#print(ray.get(remaining_ids))
'''
The following few lines is for testing the behavior of the wait method
without specifying the number of returning object IDs. Comment it before
running the for loop in the original documentation.
'''
# # Whenever one task finishes, start a new one. 
# ready_ids, remaining_ids, = ray.wait(remaining_ids, num_returns=3)
# # Get the available object and do something with it. 
# print(ray.get(ready_ids))
# # Print out the remaining ids
# print(ray.get(remaining_ids))

'''
Observation: when the number of object IDs to be returned is not specified,
the wait method will automatically set it to 1. 
'''
for _ in range(10):
    # actually this only works when num_returns = 1, otherwise it will 
    # result in a dead kernel since finally the number of remaining IDs 
    # will be 0
    ready_ids, remaining_ids = ray.wait(remaining_ids)
    #print(ray.get(remaining_ids))
    # Get the available object and do something with it. 
    print(ray.get(ready_ids))
    # Start a new task. The number of remaining IDs will be consistent.
    remaining_ids.append(f.remote())


[10]
[10]
[10]
[10]
[10]
[10]
[10]
[10]
[10]
[10]

ray.wait (object_ids, num_returns=1, timeout=None, worker= < ray.worker.Worker object > )

Return a list of IDs that are ready and a list of IDs that are not.

If timeout is set, the function returns either when the requested number of IDs are ready or when the timeout is reached, whichever occurs first. If it is not set, the function simply waits until that number of objects is ready and returns that exact number of object IDs.

This method returns two lists. The first list consists of object IDs that correspond to objects that are stored in the object store. The second list corresponds to the rest of the object IDs (which may or may not be ready).

Parameters

  • object_ids (List [Object ID]) - List of object IDs for objects that may or may not be ready. Note that these IDs must be unique.
  • num_returns (int) - The number of object IDs that should be returned.
  • timeout (int) - The maximum amount of time in milliseconds to wait before returning.

Returns

A list of object IDs that are ready and a list of the remaining object IDs.

Viewing errors

Keeping track of errors that occur in different processes throughout a cluster can be challenging. There are a couple mechanisms to help with this.

  1. If a task throw an exception, that exception will be printed in the background of the driver process.

  2. If ray.get is called on an object ID whose parent task threw an exception before creating the object, the exception will be re-raised by ray.get

The errors will also be accumulated in Redis and can be accessed with ray.error_info. Normally, you shouldn't need to do this, but it is possible.


In [24]:
@ray.remote
def f():
    raise Exception("This task failed!!")
    
f.remote()  # An error message will be printed in the background.

# Wait for the error to propagate to Redis.
import time
time.sleep(1)

ray.error_info()  # This returns a list containing the error message.


Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<ipython-input-24-d214de529b7d>", line 3, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:45530")
  
Out[24]:
[{b'data': b"{'function_id': b'\\x13\\xea=\\x05J\\xf6\\xb0\\x16\\xbc\\xa9\\xe4\\xb1\\xb6\\xc8\\xd7\\xb7\\x88\\xa0\\xc4\\xe9', 'function_name': '__main__.f'}",
  b'message': b'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<ipython-input-24-d214de529b7d>", line 3, in f\nException: This task failed!!\n',
  b'type': b'task'}]

ray.error_info (worker= < ray.worker.Worker object >)

Return information about failed tasks.